/**
 * Project Name:ccopsms
 * File Name:Container.java
 * Package Name:com.ccop.common.cmpp
 * Date:2016年2月18日下午1:18:17
 * Copyright (c) 2016, LiHao All Rights Reserved.
 *
*/

package com.cloud.demo.cmppsh.socket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloud.demo.bean.SendMsgMo;
import com.cloud.demo.bean.SendMsgReport;
import com.cloud.demo.bean.SendMsgReq;
import com.cloud.demo.cmpp.CmppCommand;
import com.cloud.demo.cmpp.msg.MsgActiveTestResp;
import com.cloud.demo.cmpp.msg.MsgConnect;
import com.cloud.demo.cmpp.msg.MsgConnectResp;
import com.cloud.demo.cmpp.msg.MsgDeliver;
import com.cloud.demo.cmpp.msg.MsgDeliverResp;
import com.cloud.demo.cmpp.msg.MsgHead;
import com.cloud.demo.cmpp.msg.MsgSubmit;
import com.cloud.demo.cmpp.msg.MsgSubmitResp;
import com.cloud.demo.redis.RedisDao;
import com.cloud.demo.utils.ByteConvert;
import com.cloud.demo.utils.Constants;
import com.cloud.demo.utils.DateUtils;
import com.cloud.demo.utils.GenerateId;
import com.cloud.demo.utils.JacksonUtil;
import com.cloud.demo.utils.MsgUtils;
import com.cloud.demo.utils.SpringConfigTool;


/**
 * ClassName:Container <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason:	 TODO ADD REASON. <br/>
 * Date:     2016年2月18日 下午1:18:17 <br/>
 * @author   LiHao
 * @version  
 * @since    JDK 1.6
 * @see 	 
 */
public class CmppShContainer {
	private static Logger logger = LoggerFactory.getLogger(CmppShContainer.class);
	private volatile static Socket msgSocket;
	private static int timeout = 1000 * 60; // 等待响应超时时间，初始值20秒
	private static int reconnTime=1000*10;  //重连间隔
	private static InputStream inStream = null;
	private static OutputStream outStream = null;
	private static java.util.Timer heartTimer = null;
	private static boolean isLaunchHeartcheck = false;// 是否已启动心跳检测
	private static boolean isNetworkConnect = false; // 网络是否已连接
	private final static ConcurrentHashMap<Integer, Object> recMsgMap = new ConcurrentHashMap<Integer, Object>();
	private static Thread receiveThread = null;
	private final static ReentrantLock lock = new ReentrantLock();
	private static CmppShContainer container=null;
	private static boolean isReConnection=false; //重连标识
	private static String host;
	private static int port;
	private static String spId;
	private static String sharedSecret;
	private static String spCode="";
	//请求发送对列
	public static ConcurrentLinkedQueue<SendMsgReq> smsReqQueue = new ConcurrentLinkedQueue<SendMsgReq>();
	//请求发送对列信号
	public static Semaphore smsReqQueueSemp  = new Semaphore(0);
	public static Semaphore smsQpsSemp  = new Semaphore(Constants.CMPP_QPS_SHYD);
	
	//private static PushDataService pushDataService =SpringConfigTool.getBean("pushDataService");
	private static RedisDao redisDao=SpringConfigTool.getBean("redisDao");
	
	synchronized public static CmppShContainer getInstance(){
		if(container==null){
			container=new CmppShContainer();
		}
		return container;
	}
	private CmppShContainer(){
		int qps=100;
		try {
			host="127.0.0.1";//192.168.21.57  121.43.19.174 114.215.210.226  127.0.0.1 114.215.237.41
					//ReadConfigation.getConfigItem("cmpp_ip_sh");
			port=5001;//5001  5002  9060
					//Integer.parseInt(ReadConfigation.getConfigItem("cmpp_port_sh"));
			//host="192.168.21.50";
			//port=6004;
			spId="222222";
			sharedSecret="222222";//111111  222222
			//spCode="300008";//300009  2147483698 26886
			//spId="yanzyc";
			//sharedSecret="yanzy123";
			//spCode="26886";
			//spId="222222";
			//sharedSecret="222222";
			//spId="yanzyc";
			//sharedSecret="yanzy123";
			//spId="111111";
			//sharedSecret="111111";//111111  222222
			//spId="201766";
			//sharedSecret="123456";
			logger.info("【cmpp 上海移动】host:{},port{},spId:{},sharedSecret:{},spCode:{}",new Object[]{host,port,spId,sharedSecret,spCode});
			init(host,port);
			new Thread(new ExeSendQuene()).start();
		} catch (Exception e) {
			logger.error("【cmpp Socket链接短信网关失败】："+e.getMessage());
			e.printStackTrace();
		}
	}
	private void init(String host, int port) throws Exception{
			logger.info("【cmpp 创建Socket】host:{},port:{}",new Object[]{host,port});
			inStream=null;
			outStream=null;
			msgSocket=new Socket(host,port);
			
			logger.info("【cmpp socket已建立】");
			msgSocket.setKeepAlive(true);
			msgSocket.setTcpNoDelay(true);// 数据不作缓冲，立即发送
			msgSocket.setSoLinger(true, 0);// socket关闭时，立即释放资源
			msgSocket.setTrafficClass(0x04 | 0x10);// 高可靠性和最小延迟传输
			isNetworkConnect=true; 
			inStream=msgSocket.getInputStream();
			outStream=msgSocket.getOutputStream();
			isReConnection=false;
				logger.info("【cmpp 新建消息接收线程】");
				receiveThread = new Thread(new ReceiveWorker(),"cmppshReceiveThread"); //启动消息接收线程
				receiveThread.start();
				boolean ismgFlag=connectISMG();
				if(ismgFlag){
					logger.info("【cmpp 链接ISMG成功】");
				}else{
					logger.warn("【cmpp 链接ISMG失败】");
				}
				if(!isLaunchHeartcheck){
					logger.info("【cmpp 启动链路检测任务】");
					launchHeartcheck();
				}
			
	}
	
	private void launchHeartcheck() {
		if(null==msgSocket||msgSocket.isClosed()||!msgSocket.isConnected())
			throw new IllegalStateException("socket is not established!");
		heartTimer = new Timer();
		isLaunchHeartcheck = true;
		heartTimer.schedule(new TimerTask() {
			public void run() {
				int count=0;
				boolean result=activityTestISMG();
				while(!result){
					count++;
					logger.info("【cmpp 链路检测失败】count:"+count);
					if(count>=Constants.TEST_CONN_COUNT){//如果再次链路检查次数超过两次则终止连接
						if(isReConnection){
							logger.info("【cmpp 正在重连,退出检测】"); 
						}else{
							logger.warn("【cmpp 链路检测失败次数超过,发起重连】");
							reConnection();
						}
						break;
					}
					result=activityTestISMG();
				}
			}
		}, 10 * 1000, 18 * 1000);
		//延迟60秒后执行 2分钟检测一次
	}
	private void reConnection(){
		if(!isReConnection){
			isReConnection=true;
		logger.info("【cmpp重连】重新建立与"+host+":"+port+"的连接");
		//清理工作，中断计时器，中断接收线程，恢复初始变量
		if(heartTimer!=null)
		heartTimer.cancel();
		isLaunchHeartcheck=false;
		isNetworkConnect=false;
		
		if(receiveThread!=null&&!receiveThread.isInterrupted()){
			logger.info("【cmppsh重连】中断receiveThread线程");
			receiveThread.interrupt();
		}
		try{
			if(msgSocket!=null)
			msgSocket.close();
		}catch(IOException e1){
			logger.error("【cmppsh重连】关闭socket连接发生IO流异常",e1);
			e1.printStackTrace();
		}
		new Thread(new Runnable(){
			public void run(){ 
			synchronized(this){
				for(;;){ 
					logger.warn("【cmpp reConnection 进行重连】");
						try{
//							Thread.currentThread();
							Thread.sleep(reconnTime);
							init(host,port);
							this.notifyAll();
							break;
						}catch (Exception e) {
							logger.error("【cmpp 重新链接短信网关失败】："+e.getMessage());
							e.printStackTrace();
						}
				}
				}
			}
		}).start(); 
		}else{
			logger.info("【cmpp正在重连】");
		}
	}
	public static boolean activityTestISMG(){
		try {
			int sequenceId=getSequence();
			logger.info("【cmpp 链路检查】"+sequenceId);
			MsgHead head=new MsgHead();
			head.setTotalLength(12);//消息总长度，级总字节数:4+4+4(消息头)+6+16+1+4(消息主体)
			head.setCommandId(CmppCommand.CMPP_ACTIVE_TEST);//标识创建连接
			head.setSequenceId(sequenceId);//序列，由我们指定
			
			List<byte[]> dataList=new ArrayList<byte[]>();
			dataList.add(head.toByteArry());
			Map<String, Object> map=sendReqMsg(dataList,sequenceId);
			Integer commandId =(Integer)map.get("commandId");
			if(commandId==CmppCommand.CMPP_ACTIVE_TEST_RESP){
				MsgActiveTestResp resp =(MsgActiveTestResp)map.get("resp");
				logger.info("【cmpp 链路检查成功】"+resp.getSequenceId());	
				return true;
			}
			return false;
		} catch (Exception e) {
			logger.error("【cmpp 链路检查异常】"+e.getMessage());	
			e.printStackTrace();
			return false;
		}
	}
	/**
	 * connectISMG:(创建Socket链接后请求链接互联网短信网关). <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @return 成功返回true
	 * @since JDK 1.6
	 */
	private static boolean connectISMG(){
		logger.info("【cmpp 连接ISMG】,Source_Addr:{},secret:{}",new Object[]{spId,sharedSecret});
		boolean flag=false;
		MsgConnect connect=new MsgConnect();
		int sequenceId=getSequence();
		connect.setTotalLength(12+6+16+1+4);//消息总长度，级总字节数:4+4+4(消息头)+6+16+1+4(消息主体)
		connect.setCommandId(CmppCommand.CMPP_CONNECT);//标识创建连接
		connect.setSequenceId(sequenceId);//序列，由我们指定
		connect.setSourceAddr(spId);//我们的企业代码
		String timestamp=MsgUtils.getTimestamp();
		connect.setAuthenticatorSource(MsgUtils.getCmppAuthenticatorSource(spId,sharedSecret,timestamp));//md5(企业代码+密匙+时间戳)
		connect.setTimestamp(Integer.parseInt(timestamp));//时间戳(MMDDHHMMSS)
		connect.setVersion((byte)0x30);//版本号 高4bit为3，低4位为0
		List<byte[]> dataList=new ArrayList<byte[]>();
		dataList.add(connect.toByteArry());
		//CmppSender sender=new CmppSender(getSocketDOS(),getSocketDIS(),dataList);
		try {
			logger.debug("【cmpp sequenceId】:"+sequenceId);
			Map<String, Object> map=sendReqMsg(dataList,sequenceId);
			Integer commandId =(Integer)map.get("commandId");
			if(commandId==CmppCommand.CMPP_CONNECT_RESP){
				MsgConnectResp resp =(MsgConnectResp)map.get("resp");
				if(resp.getStatus()==0){
					flag=true;
					logger.info("【cmpp 返回连接ISMG成功】");
				}else{
					logger.warn("【cmpp 连接ISMG失败】Status:{},{}",new Object[]{resp.getStatus(),resp.getStatusStr()});
				}
			}
		} catch (IOException e) {
			logger.info("【cmpp 连接ISMG异常】"+e.getMessage());
			e.printStackTrace();
		}
		return flag;
	}
	/**
	 * putSendMsg:放到发送请求对列，返回序号id. <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @param req
	 * @return
	 * @since JDK 1.6
	 */
	public static Map<String, String> putSendMsg(SendMsgReq req){
		Map<String, String> map=new HashMap<String,String>();
		if(isNetworkConnect){
			String respId=GenerateId.getUUID();
			req.setRespId(respId);
			logger.info("【cmpp入队列】"+JacksonUtil.writeValueAsString(req));
			String[] cusMsisdn=req.getTel().split(",");
			int len=cusMsisdn.length;
			if(len>1){
				for (int i = 0; i < len; i++) {
					SendMsgReq reqClone= req.clone();
					reqClone.setTel(cusMsisdn[i]);
					smsReqQueue.offer(reqClone);
				}
			}else{
				smsReqQueue.offer(req);
			}
			smsReqQueueSemp.release(); //通知取对列发送
			map.put("respId", respId);
			map.put("result", "0");
		}else{
			map.put("respId", null);
			map.put("result", "-5");
		}
		return map;
	}
	
	/**
	 * takeSendMsg:从发送对列取数据发送. <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @since JDK 1.6
	 */
	public static void takeSendMsg(){
		try {
			smsReqQueueSemp.acquire();
			logger.debug("【cmppSh】获取发送队列信号量");
		} catch (InterruptedException e) {
			logger.error("【cmppSh】取发送队列信号量获取异常"+e.toString());
			e.printStackTrace();
		}
		try {
			smsQpsSemp.acquire(); //取凭证
			logger.debug("【cmppSh】获取发送队列并发信号量");
		} catch (InterruptedException e) {
			e.printStackTrace();
			logger.error("【cmppSh】取发送队列并发信号量获取异常"+e.toString());
		}
		if(!isNetworkConnect){
			for(;;){ 
				if(isNetworkConnect){
					logger.debug("【cmppSh】break");
					break;
				}
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		if(!smsReqQueue.isEmpty()){
			SendMsgReq req=smsReqQueue.poll();
			sendMsg(req);	
		}
		smsQpsSemp.release();
		
//		if(isNetworkConnect){
//			try {
//				smsQpsSemp.acquire(); //取凭证
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//				logger.error("【cmppSh】取发送对列并发信号量获取异常"+e.toString());
//			}
//			if(!smsReqQueue.isEmpty()){
//				SendMsgReq req=smsReqQueue.poll();
//				sendMsg(req);	
//			}
//			smsQpsSemp.release();
//		}else{
//			try {
//				logger.error("【cmppSh】取发送对列时，socket未连接");
//				Thread.sleep(5000); //停5秒
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
//		}
	}
	private static void pushInfo(String info) {
		//String cmppNoticeUrl=ReadConfigation.getConfigItem("cmppNoticeUrl_sh");
		//pushDataService.postDataByAsynMode(cmppNoticeUrl, info,"cmpp_sh");
	}
	/**
	 * sendMsg:(发送短信). <br/>
	 * TODO(这里描述这个方法适用条件 – 可选).<br/>
	 * TODO(这里描述这个方法的执行流程 – 可选).<br/>
	 * TODO(这里描述这个方法的使用方法 – 可选).<br/>
	 * TODO(这里描述这个方法的注意事项 – 可选).<br/>
	 *
	 * @author LiHao
	 * @return -1=下发异常  -2=获取消息长度异常
	 * @since JDK 1.6
	 */
	public static MsgSubmitResp sendMsg(SendMsgReq req){
		String msg=req.getMsg();
		String[] cusMsisdn=req.getTel().split(","); 
		String appendCode=req.getAppendCode();
		String sign=req.getSignature();
		int signLen=0;
		logger.info("【cmpp sendMsg】appendCode：{},cusMsisdn:{},msg:{},sign:{}",new Object[]{appendCode,req.getTel(),msg,sign});
		try{
			int sequenceId=getSequence();
			//redis保存seqId 和 返回短信网关id的关系
			Map<String,String> map=new HashMap<String,String>();
			map.put("respId", req.getRespId());
			map.put("outId", req.getOutId());
			map.put("seqId", sequenceId+"");
			logger.info("【cmppSH redis保存seqid_respid】,key:{},respId:{},outId:{},seqId:{}",new Object[]{Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId,req.getRespId(),req.getOutId(),sequenceId});
//			redisDao.setExpire(Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId, req.getRespId(), Constants.SMS_EXPIRE_SECONDS);
			redisDao.setMapExpire(Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId, map, Constants.SMS_EXPIRE_SECONDS);
			if(StringUtils.isNotBlank(sign)){
				signLen=sign.getBytes("UnicodeBigUnmarked").length;
			}
			byte msgByte[]=msg.getBytes("UnicodeBigUnmarked");
			logger.debug("msgByte.length:"+msgByte.length+",cusMsisdn:"+cusMsisdn);
			MsgSubmitResp resp=null;
			if(msgByte.length<=140-signLen){//短短信
				resp=sendShortMsg(msgByte,cusMsisdn, appendCode,sequenceId);
			}else{//长短信
				resp=sendLongMsg(msgByte,cusMsisdn,appendCode,sequenceId);
			}
			if(resp==null||(resp!=null&&resp.getResult()!=0)){
				SendMsgReport report=new SendMsgReport(1,Constants.SMS_CARRIER_TYPE_CMPPSH);
				report.setTel(req.getTel());
				report.setSeqId(sequenceId+"");
				report.setMsgId(req.getRespId());
				report.setOutId(req.getOutId());
				if(resp==null){
					report.setStat("rstimeout");
				}else{
					report.setStat("rs"+resp.getResult());
				}
				report.setSubmitTime(DateUtils.formatDate(new Date(),"yyMMddHHmmss"));
				report.setDoneTime(DateUtils.formatDate(new Date(),"yyMMddHHmmss"));
				//推送状态报告
				String reportStr=JacksonUtil.writeValueAsString(report);
				logger.warn("【cmppsh 推送发送异常状态报告】"+reportStr);
				pushInfo(reportStr);
			}
			return resp;
		}catch(Exception e){
			logger.error("【cmppsh sendMsg】error"+e.getMessage());
			e.printStackTrace();
			return null;
		}
	}
	/**
	 * 发送短短信
	 * @return
	 */
	private static MsgSubmitResp sendShortMsg(byte msgByte[],String[] cusMsisdn,String appendCode,int sequenceId){
		try {		
			int numCount=cusMsisdn.length;
			int byteLen=32*numCount;
			MsgSubmit submit=new MsgSubmit();
			submit.setTotalLength(12+8+1+1+1+1+10+1+32+1+1+1+1+6+2+6+17+17+21+1+byteLen+1+1+msgByte.length+20);
			submit.setCommandId(CmppCommand.CMPP_SUBMIT);
			submit.setSequenceId(sequenceId);		
			//Msg_Id不填0
			submit.setPkTotal((byte)0x01);  //相同Msg_Id的信息总条数
			submit.setPkNumber((byte)0x01);	//相同Msg_Id的信息序号，从1开始		
			submit.setRegisteredDelivery((byte)0x01); //是否要求返回状态确认报告 1：需要
			submit.setMsgLevel((byte)0x01); // 信息级别?
			submit.setServiceId("");//业务标识，是数字、字母和符号的组合
			submit.setFeeUserType((byte)0x02);//计费用户类型字段 2:对SP计费
			submit.setFeeTerminalId("");			
			submit.setFeeTerminalType((byte)0x00); //被计费用户的号码类型，0：真实号码；1：伪码。			
			submit.setTpPId((byte)0x00);
			submit.setTpUdhi((byte)0x00);
			submit.setMsgFmt((byte)0x08);             //信息格式  8：UCS2编码； 15含GB汉字
			submit.setMsgSrc(spId);    //信息内容来源(SP_Id)
			submit.setFeeType("02");      //资费类别对“计费用户号码”按条计信息费
			submit.setFeeCode("5");  //资费代码
			//ValId_Time  At_Time 暂不填
			//DestUsr_tl 接收信息的用户数量(小于100个用户) 1
			submit.setDestUsrTl((byte)numCount);
			submit.setSrcId(spCode+appendCode); //下发号码
			submit.setDestTerminalId(cusMsisdn);      //接收手机号
			submit.setMsgLength((byte)(msgByte.length));  //信息长度
			submit.setMsgContent(msgByte);  //信息内容
//			
			List<byte[]> dataList=new ArrayList<byte[]>();
			dataList.add(submit.toByteArry());

			logger.info("【cmpp向{}下发短短信】序列号为:{}",new Object[]{cusMsisdn,sequenceId});
			try {
				Map<String, Object> map=sendReqMsg(dataList,sequenceId);
				Integer commandId =(Integer)map.get("commandId");
				if(commandId==CmppCommand.CMPP_SUBMIT_RESP){
					MsgSubmitResp resp =(MsgSubmitResp)map.get("resp");
					return resp;
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		} catch (Exception e) {
			logger.error("【cmpp向{}下发短短信】序列号为:{},发送异常",new Object[]{cusMsisdn,sequenceId,e.getMessage()});
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 发送长短信
	 * signLen 签名长度,上海移动签名是移动追加 的，所以计算长度时需算上签名长度
	 * @return
	 */
	private static MsgSubmitResp sendLongMsg(byte[] allByte,String[] cusMsisdn,String appendCode,int sequenceId){
		try {
			int numCount=cusMsisdn.length;
			int byteLen=32*numCount;
//			int sequenceId=getSequence();
//			byte[] allByte=msg.getBytes("UnicodeBigUnmarked");//UTF-16BE
			List<byte[]> dataList=new ArrayList<byte[]>();
			int msgLength=allByte.length;
			int maxLength=134; //140
			int msgSendCount=msgLength%(maxLength-6)==0?msgLength/(maxLength-6):msgLength/(maxLength-6)+1;
			//短信息内容头拼接
			byte[] msgHead=new byte[6];
			msgHead[0]=0x05;
			msgHead[1]=0x00;
			msgHead[2]=0x03;
			msgHead[3]=(byte)sequenceId;
			msgHead[4]=(byte)msgSendCount;
			msgHead[5]=0x01;
			
			for(int i=0;i<msgSendCount;i++){
				//msgHead[3]=(byte)MsgUtils.getSequence();
				msgHead[5]=(byte)(i+1);
				byte[] needMsg=null;
				//消息头+消息内容拆分
				if(i!=msgSendCount-1){
					int start=(maxLength-6)*i;
					int end=(maxLength-6)*(i+1);
					needMsg=MsgUtils.getMsgBytes(allByte,start,end);
				}else{
					int start=(maxLength-6)*i;
					int end=allByte.length;
					needMsg=MsgUtils.getMsgBytes(allByte,start,end);
				}
				int subLength=needMsg.length+msgHead.length;
				byte[] sendMsg=new byte[needMsg.length+msgHead.length];
				System.arraycopy(msgHead,0,sendMsg,0,6);
				System.arraycopy(needMsg,0,sendMsg,6,needMsg.length);
				MsgSubmit submit=new MsgSubmit();
				submit.setTotalLength(12+8+1+1+1+1+10+1+32+1+1+1+1+6+2+6+17+17+21+1+byteLen+1+1+subLength+20);
				submit.setCommandId(CmppCommand.CMPP_SUBMIT);
				submit.setSequenceId(sequenceId);			
				submit.setPkTotal((byte)msgSendCount);
				submit.setPkNumber((byte)(i+1));		
				submit.setRegisteredDelivery((byte)0x01);
				submit.setMsgLevel((byte)0x01);
				submit.setFeeUserType((byte)0x02);
				submit.setFeeTerminalId("");			
				submit.setFeeTerminalType((byte)0x00);			
				submit.setTpPId((byte)0x00);
				submit.setTpUdhi((byte)0x01);
				submit.setMsgFmt((byte)0x08);
				submit.setMsgSrc(spId);
				submit.setSrcId(spCode+appendCode);
				submit.setDestUsrTl((byte)numCount);
				submit.setDestTerminalId(cusMsisdn);
				submit.setMsgLength((byte)subLength);
				submit.setMsgContent(sendMsg);
				dataList.add(submit.toByteArry());
			}
//			CmppSender sender=new CmppSender(getSocketDOS(),getSocketDIS(),dataList);
//			sender.start();
			logger.info("【cmpp向{}下发长短信】序列号为:{},msgSendCount:{}",new Object[]{cusMsisdn,sequenceId,msgSendCount});
			try {
				Map<String, Object> map=sendReqMsg(dataList,sequenceId);
				Integer commandId =(Integer)map.get("commandId");
				if(commandId==CmppCommand.CMPP_SUBMIT_RESP){
					MsgSubmitResp resp =(MsgSubmitResp)map.get("resp");
					return resp;
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		} catch (Exception e) {
			logger.error("【cmpp向{}下发长短信】序列号为:{},发送异常",new Object[]{cusMsisdn,sequenceId,e.getMessage()});
			e.printStackTrace();
		}
		return null;
	}
	@SuppressWarnings("unchecked")
	public static Map<String, Object> sendReqMsg(List<byte[]> dataList,int sequenceId) throws IOException {
		if(dataList ==null) {   
			return null;
		}
		if(!isNetworkConnect){
			synchronized(CmppShContainer.class){
				try{
					CmppShContainer.class.wait(1000*5);//等待5秒，如果网络还没有恢复，抛出IO流异常
					if(!isNetworkConnect){
						throw new IOException("【cmpp】sendReqMsg网络连接中断！");
					}
				}catch(InterruptedException e){
					logger.error("【cmpp】sendReqMsg发送线程中断",e);
					e.printStackTrace();
				}
			}
		} 
		Condition msglock = lock.newCondition(); //消息锁  
        recMsgMap.put(sequenceId, msglock);
		for(byte[] data:dataList){
			outStream.write(data);
			outStream.flush();
	    }
        try {  
            lock.lock();
            logger.debug("【cmpp】lock.lock(),sequenceId:"+sequenceId);
            msglock.await(timeout,TimeUnit.MILLISECONDS);  //等待
            logger.debug("【cmpp】msglock.await,sequenceId:"+sequenceId);
        } catch (InterruptedException e) {  
        	logger.error("【cmpp】lock发送线程中断",e); 
        	e.printStackTrace();
        } finally {  
            lock.unlock();  
        }  
        Object respMsg = recMsgMap.remove(sequenceId); //响应信息  
        if(respMsg!=null){
	        if((respMsg != msglock)) { 
	        	logger.debug("【cmpp】return respMsg");
	        	return (Map<String, Object>) respMsg;
	        } else {  
	        	logger.warn("【cmpp】msglock响应超时"+sequenceId+"未收到消息");  
	            throw new SocketTimeoutException(sequenceId+" 超时，未收到响应消息");  
	        }
        }else{
        	logger.warn("【cmpp】respMsgnull响应超时"+sequenceId+"未收到消息"); 
        	throw new SocketTimeoutException(sequenceId+" respMsgnull响应超时，未收到响应消息"); 
        }
	}
//	private static int sequenceId=Constants.CMPP_SEQ_MIN_SH;//序列编号
//	private synchronized static int getSequence(){
//		++sequenceId;
//		if(sequenceId>=Constants.CMPP_SEQ_MAX_SH){
//			sequenceId=Constants.CMPP_SEQ_MIN_SH;
//		}
//		return sequenceId;
//	}
	private synchronized static int getSequence(){
		Long seqId=redisDao.incrRes(Constants.SMS_REDIS_INCR_CMPP2, Constants.SEQ_MIN, Constants.SEQ_MAX+"");
		return seqId.intValue();
	}
	class ExeSendQuene implements Runnable{
		@Override
		public void run() {
			logger.info("【smpp】启动从队列获取消息发送线程");
			while(true){
				takeSendMsg();
			}
		}
		
	}
	private class ReceiveWorker implements Runnable {
		boolean readFlag=true; 
		public void run() {
			logger.info("【cmppsh】启动消息接收线程,readFlag:"+readFlag);
			while (readFlag) {
				
				try {
					byte [] headBytes = new byte[CmppCommand.MsgHeaderLength];
					byte totalLenBytes[]=new byte[CmppCommand.MsgTotalLength];
					byte commandIdBytes[]=new byte[CmppCommand.MsgCommandId];
					byte sequenceIdBytes[]=new byte[CmppCommand.MsgSequenceId];
//					logger.debug("receive Thread read");
					int headLen=inStream.read(headBytes); //阻塞
					logger.warn("【cmppsh】headLen:"+headLen);
					if (headLen == -1) {
						logger.warn("【cmpp 读到流未尾，对方已关闭流!,发起重连】");
						return;
//						continue;
					}else{
						System.arraycopy(headBytes,0,totalLenBytes,0,4);
						System.arraycopy(headBytes,4,commandIdBytes,0,4);
						System.arraycopy(headBytes,8,sequenceIdBytes,0,4);
					}
					int totalLen=ByteConvert.bytesToInt(totalLenBytes);
					logger.debug("【cmpp获取数据长度】totalLen:"+totalLen);
					byte bodyBytes[]=new byte[totalLen-CmppCommand.MsgHeaderLength];
					int bodyLen=inStream.read(bodyBytes);
					logger.debug("【cmpp获取body数据】bodyLen:"+bodyLen);
					int sequenceId=ByteConvert.bytesToInt(sequenceIdBytes);
					int commandId=ByteConvert.bytesToInt(commandIdBytes);
					Map<String, Object> map = new HashMap<String, Object>();
	                map.put("commandId", commandId);
					switch(commandId){
						case CmppCommand.CMPP_CONNECT_RESP:
							MsgConnectResp connectResp=new MsgConnectResp(totalLen, commandId, sequenceId, bodyBytes);
							map.put("resp", connectResp);
							logger.info("【cmpp链接短信网关回值】状态:{},序列号:{}",new Object[]{connectResp.getStatusStr(),connectResp.getSequenceId()});
							break;
						case CmppCommand.CMPP_SUBMIT_RESP:
							MsgSubmitResp submitResp=new MsgSubmitResp(totalLen, commandId, sequenceId,bodyBytes);
							map.put("resp", submitResp);
							logger.info("【cmppSH向用户下发短信回值】状态码:{},序列号:{},msgId:{}",new Object[]{submitResp.getResult(),submitResp.getSequenceId(),submitResp.getMsgId()});
							//根据序号取返回给短信网关的消息id
//							String respId=redisDao.get(Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId);
							Map<String,String> respMap=redisDao.getMap(Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId);
							//存redis 有效期24小时 msgid 序列号
							if(!respMap.isEmpty()){
								logger.info("【cmppSh redis保存msgid_respid】key:{},respId:{},outId:{},seqId:{}",new Object[]{Constants.SMS_REDIS_MSGID_CMPPSH+submitResp.getMsgId(),respMap.get("respId"),respMap.get("outId"),respMap.get("seqId")});
//								redisDao.setExpire(Constants.SMS_REDIS_MSGID_CMPPSH+submitResp.getMsgId(), respId, Constants.SMS_EXPIRE_SECONDS);
								redisDao.setMapExpire(Constants.SMS_REDIS_MSGID_CMPPSH+submitResp.getMsgId(), respMap, Constants.SMS_EXPIRE_SECONDS);
							}else{
								logger.warn("【cmppSh redis保存msgid_respid】未取到序号"+Constants.SMS_REDIS_SEQ_CMPPSH+sequenceId+"对应的respId");
							}
							break;
						case CmppCommand.CMPP_TERMINATE_RESP:
							logger.info("【cmpp】接收到终止链接响应)");
							logger.info("【cmpp拆除与ISMG的链接回值】序列号："+sequenceId);
							break;
						case CmppCommand.CMPP_ACTIVE_TEST_RESP:
							MsgActiveTestResp activeResp=new MsgActiveTestResp(totalLen, commandId, sequenceId,bodyBytes);
							map.put("resp", activeResp);
							logger.info("【cmpp短信网关与ISMG进行连接检查回复】序列号："+sequenceId);

							int selfsequenceId=getSequence();
							logger.info("【cmpp 中断请求】"+selfsequenceId);
							MsgHead head=new MsgHead();
							head.setTotalLength(12);//消息总长度，级总字节数:4+4+4(消息头)+6+16+1+4(消息主体)
							head.setCommandId(CmppCommand.CMPP_TERMINATE);//标识创建连接
							head.setSequenceId(sequenceId);//序列，由我们指定
							//outStream.write(head.toByteArry());
							//outStream.flush();
							break;
						case CmppCommand.CMPP_ACTIVE_TEST:
							logger.info("【cmpp短信网关收到ISMG发送的链路检测】CMPP_ACTIVE_TEST 序列号："+sequenceId);
							MsgActiveTestResp msgActiveResp=new MsgActiveTestResp(12+1,CmppCommand.CMPP_ACTIVE_TEST_RESP,sequenceId,null);
							byte b='0';
							msgActiveResp.setReserved(b);
							//进行回复
							outStream.write(msgActiveResp.toByteArry());
							outStream.flush();
							break;
						case CmppCommand.CMPP_DELIVER:
							logger.debug("【cmpp接收到ISMG DELIVER 消息】");
							MsgDeliver msgDeliver=new MsgDeliver(totalLen, commandId, sequenceId,bodyBytes);
							if(msgDeliver.getResult()==0){
								logger.info("【cmpp DELIVER 消息】序列号："+sequenceId+",是否状态报告回复"+(msgDeliver.getRegistered_Delivery()==0?"不是,消息内容："+msgDeliver.getMsg_Content():"是,目的手机号："+msgDeliver.getDest_terminal_Id()+",msgId:"+msgDeliver.getMsg_Id()+",reportId:"+msgDeliver.getMsg_Id_report()));
								if(msgDeliver.getRegistered_Delivery()==1){
									//状态报告 1：状态报告
									SendMsgReport report=new SendMsgReport(1,Constants.SMS_CARRIER_TYPE_CMPPSH);
									report.setMsgId(msgDeliver.getMsg_Id_report()+"");
									report.setTel(msgDeliver.getDest_terminal_Id().trim());
									report.setStat(msgDeliver.getStat().trim());
									report.setSubmitTime(msgDeliver.getSubmit_time());
									report.setDoneTime(msgDeliver.getDone_time());
									//异步推送
//									String reportStr=JacksonUtil.writeValueAsString(report);
//									logger.info("【cmpp 推送状态报告】"+reportStr);
//									String cmppNoticeUrl=ReadConfigation.getConfigItem("cmppNoticeUrl_sh");
//									pushDataService.postDataByAsynMode(cmppNoticeUrl, reportStr,"cmpp_sh");
//									
									
									report.setCarrierMsgId(msgDeliver.getMsg_Id_report()+"");
									//需要根据状态报告的msgId取对应的返回网关的id
									String reportId=Constants.SMS_REDIS_MSGID_CMPPSH+msgDeliver.getMsg_Id_report();
//									String msgId=redisDao.get(reportId);
									Map<String, String> rpMap=redisDao.getMap(reportId);
									if(!rpMap.isEmpty()){
										report.setMsgId(rpMap.get("respId"));
										report.setSeqId(rpMap.get("seqId"));
										report.setOutId(rpMap.get("outId"));
										//异步推送
										String reportStr=JacksonUtil.writeValueAsString(report);
										logger.info("【cmppsh 推送状态报告】"+reportStr);
//										String smgpNoticeUrl=ReadConfigation.getConfigItem("cmppNoticeUrl_sh");
//										pushDataService.postDataByAsynMode(smgpNoticeUrl, reportStr,"cmpp_sh");	
										pushInfo(reportStr);
										//report批量入库 放对列
										//SgwQueue.putReportQueue(report);
									}else{
										logger.warn("【cmppsh redis状态报告】没取到reportId:"+reportId+"对应的msgId");
									}
									
								}else{
									//上行消息 0：非状态报告
									SendMsgMo mo=new SendMsgMo(0);
									mo.setTel(msgDeliver.getSrc_terminal_Id().trim());
									mo.setMoCode(msgDeliver.getDest_Id().trim());
									mo.setContent(msgDeliver.getMsg_Content());
									mo.setSubmitTime(DateUtils.formatMdhmsDate(new Date()));
									//异步推送
									String moStr=JacksonUtil.writeValueAsString(mo);
									logger.info("【cmpp 推送上行消息】"+moStr);
//									String cmppNoticeUrl=ReadConfigation.getConfigItem("cmppNoticeUrl_sh");
//									pushDataService.postDataByAsynMode(cmppNoticeUrl, moStr,"cmpp_sh");	
									pushInfo(moStr);
								}
							}else{
								logger.warn("【cmpp DELIVER 接收 解析失败】序列号："+sequenceId);
							}
							MsgDeliverResp msgDeliverResp=new MsgDeliverResp();
							msgDeliverResp.setTotalLength(12+8+4);
							msgDeliverResp.setCommandId(CmppCommand.CMPP_DELIVER_RESP);
							msgDeliverResp.setSequenceId(sequenceId);
							msgDeliverResp.setMsg_Id(msgDeliver.getMsg_Id());
							msgDeliverResp.setResult(5);
							//进行回复
							outStream.write(msgDeliverResp.toByteArry());
							outStream.flush();
							break;
						default:
							logger.error("【cmpp无法解析IMSP返回的包结构】包长度为:"+totalLen);
							break;
					}
					if(commandId==CmppCommand.CMPP_DELIVER){
						logger.debug("【cmpp 收到DELIVER继续】");
						continue;
					}
					logger.debug("【cmpp get sequenceId】"+sequenceId);
//					Condition msglock =(Condition) recMsgMap.get(sequenceId); 
					Object recObj=recMsgMap.get(sequenceId);
					if(recObj==null){
						logger.warn("【cmpp recObj】"+sequenceId+"序号可能已被注销!接收到消息丢弃");  
						continue;
					}
					String recObjSimpleName=recObj.getClass().getSimpleName();
					Condition msglock =null;
					logger.debug("【cmpp】getSimpleName:"+recObjSimpleName);
					if(recObjSimpleName.equals("ConditionObject")){
						msglock =(Condition) recObj;
						recMsgMap.put(sequenceId, map);  
					}else{
						logger.warn("【cmpp】sequenceId:"+sequenceId+"非ConditionObject接收到消息丢弃");  
						continue;
					}

	                try{  
	                	lock.lock();  
	                    msglock.signalAll();  //唤醒
	                    logger.debug("【cmpp】msglock.signalAll()");
	                }finally {
	                    lock.unlock();  
	                    logger.debug("【cmpp】lock.unlock()");
	                }
				}catch(IOException e){
					e.printStackTrace();
					logger.error("【cmpp读消息IO错误】"+e.getMessage());
					try {
						readFlag=false;
						inStream.close();
						outStream.close();
					} catch (IOException e1) {
						e1.printStackTrace();
					}
					if(!isReConnection){
						logger.warn("【cmpp读消息IO错误,发起重连】"+e.getMessage());
						reConnection();
					}
					e.printStackTrace();
				}catch(Exception e){
					logger.error("【cmpp读消息错误】"+e.getMessage());
					e.printStackTrace();
				}
			}//while结束
		}
	}
}

